package com.atguigu.flink.chapter06;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashSet;
import java.util.Set;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 * @date 2021/1/22 13:52
 */
public class Flink09_AdClickAnalysis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 1.读取数据、转换成 POJO
        env
                .readTextFile("input/AdClickLog.csv")
                .map(new MapFunction<String, AdsClickLog>() {
                    @Override
                    public AdsClickLog map(String value) throws Exception {
                        String[] datas = value.split(",");
                        return new AdsClickLog(
                                Long.valueOf(datas[0]),
                                Long.valueOf(datas[1]),
                                datas[2],
                                datas[3],
                                Long.valueOf(datas[4]));
                    }
                })
                .map(value -> Tuple2.of(value.getProvince()+"_"+value.getAdId(), 1L))
                .returns(Types.TUPLE(Types.STRING,Types.LONG))
                .keyBy(r -> r.f0) // 按照统计维度分组：省份、广告
                .sum(1)
                .print();

        env.execute();
    }
}
